package com.migrate.module.service;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * canal mq消息消费任务
 *
 * @author zhonghuashishan
 */
@Component
public class CanalConsumeTask implements ApplicationRunner
{
    @Value("${migrateConsumerTopic:example}")
    private String topic;
    /**
     * rocketmq的nameServer地址
     */
    @Value("${rocketmq.name-server:127.0.0.1:9876}")
    private String nameServerUrl;

    @Override
    public void run(ApplicationArguments args) throws Exception
    {
        ExecutorService executors = Executors.newFixedThreadPool(3);
        // 执行拉取任务
        executors.execute(new CanalPullRunner(topic, nameServerUrl));
        // 执行提交任务
        executors.execute(new CanalPullCommitRunner(topic, nameServerUrl));
    }
}
